package com.atguigu.flink.datastreamapi.commonapi;

import org.apache.flink.api.common.functions.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2023/11/11

 map处理时，需要读取数据库。
        map的每个Task在处理数据之前，提前创建好连接，只创建一次。
                使用连接处理当前Task收到的每一条数据
            每个Task在处理数据完成之后，再关闭连接，只关闭一次。


    Task需要执行生命周期方法，需要实现 RichFunction
    RichFunction提供了:
        open(): Task被创建完后，执行一次。
        close(): Task被销毁之前，执行一次。
 */
public class Demo2_RichMap
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        env
            .fromElements(1,2,3,4,5,6) //非并行运行的算子
           /* .map(x -> {
                //创建连接
                System.out.println("创建连接");
                String a  = "连接";
                //使用连接
                System.out.println("使用了"+a);
                //关闭连接
                System.out.println("关闭"+a);
                return x + 10;
            })*/
            .map(new MyMapFunction3())
            .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /*
        为每一个算子都提供了 RichXxxFunction函数类型。
                Map操作有 RichMapFunction
                    class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT>
                     class AbstractRichFunction implements RichFunction, Serializable
     */
    private static class  MyMapFunction3 extends RichMapFunction<Integer,Integer>
    {

        private String a;

        //map处理
        @Override
        public Integer map(Integer x) throws Exception {
            //使用连接
            System.out.println("使用了"+a);
            return x + 10;
        }

        //Task被创建完成之后，做一件事
        @Override
        public void open(Configuration parameters) throws Exception {
            //创建连接
            System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+"  创建连接");
            a = "连接";
        }

        //Task被销毁之前，执行一次。
        @Override
        public void close() throws Exception {
            //关闭连接
            System.out.println("关闭"+a);
        }

    }

    private static class  MyMapFunction2 extends AbstractRichFunction implements MapFunction<Integer,Integer>{

        private String a;

        //map处理
        @Override
        public Integer map(Integer x) throws Exception {
            //使用连接
            System.out.println("使用了"+a);
            return x + 10;
        }

        //Task被创建完成之后，做一件事
        @Override
        public void open(Configuration parameters) throws Exception {
            //创建连接
            System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+"  创建连接");
            a = "连接";
        }

        //Task被销毁之前，执行一次。
        @Override
        public void close() throws Exception {
            //关闭连接
            System.out.println("关闭"+a);
        }

    }
    private static class  MyMapFunction  implements MapFunction<Integer,Integer>, RichFunction{

        private String a;

        //map处理
        @Override
        public Integer map(Integer x) throws Exception {
            //使用连接
            System.out.println("使用了"+a);
            return x + 10;
        }

        //Task被创建完成之后，做一件事
        @Override
        public void open(Configuration parameters) throws Exception {
            //创建连接
            System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+"  创建连接");
            a = "连接";
        }

        //Task被销毁之前，执行一次。
        @Override
        public void close() throws Exception {
            //关闭连接
            System.out.println("关闭"+a);
        }

        private transient RuntimeContext runtimeContext;

        @Override
        public RuntimeContext getRuntimeContext() {
            if (this.runtimeContext != null) {
                return this.runtimeContext;
            } else {
                throw new IllegalStateException("The runtime context has not been initialized.");
            }

        }

        @Override
        public IterationRuntimeContext getIterationRuntimeContext() {
            if (this.runtimeContext == null) {
                throw new IllegalStateException("The runtime context has not been initialized.");
            } else if (this.runtimeContext instanceof IterationRuntimeContext) {
                return (IterationRuntimeContext) this.runtimeContext;
            } else {
                throw new IllegalStateException("This stub is not part of an iteration step function.");
            }
        }

        @Override
        public void setRuntimeContext(RuntimeContext t) {
            this.runtimeContext = t;
        }


    }
}
